package com.mouliu.generate;

import com.lmax.disruptor.*;

import java.util.concurrent.*;

/**
 * @author mouliu
 * @create 2018-05-28-下午8:22
 */
public class disruptorMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int BUFFER_SIZE=1024;
        int THREAD_NUMBERS=4;
        /*
         * createSingleProducer创建一个单生产者的RingBuffer，
         * 第一个参数叫EventFactory，从名字上理解就是"事件工厂"，其实它的职责就是产生数据填充RingBuffer的区块。
         * 第二个参数是RingBuffer的大小，它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
         * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者（或者说是事件处理器） 太慢了)的等待策略
         */
        final RingBuffer<Trade>ringBuffer =
                RingBuffer.createSingleProducer(new EventFactory<Trade>() {
                    @Override
                    public Trade newInstance() {
                        return new Trade();
                    }
                },BUFFER_SIZE,new YieldingWaitStrategy());

        //创建线程池
        ExecutorService executors =
                Executors.newFixedThreadPool(THREAD_NUMBERS);
        //创建SequenceBarrier
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
        //创建消息处理器
        BatchEventProcessor<Trade>transProcessor =
                new BatchEventProcessor<>(ringBuffer,sequenceBarrier,new TradeHandler());

        //这一步的目的就是把消费者的位置引入到生产者 如果只有一个消费者的情况可以忽略
        ringBuffer.addGatingSequences(transProcessor.getSequence());
        //把消息处理器提交到线程池
        executors.submit(transProcessor);
        //如果存在多少个消息费者 那重复执行上面的3行代码 把TradeHandler换成其它消费者类
        Future<?>future =
                executors.submit(new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        long seq;
                        for (int i=0;i<10;i++){
                            seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块
                            //给这个区块放入 数据
                            ringBuffer.get(seq).setPrice(Math.random()*9999);
                            //发布这个区块的数据使handler(consumer)可见
                            ringBuffer.publish(seq);
                        }
                        return null;
                    }
                });
        future.get();//等待生产者结束
        Thread.sleep(1000);
        transProcessor.halt();
        executors.shutdown();
    }
}


